parquet, arrow, duckdb

Section Overview

  1. Introduction to Column-Oriented Data Storage

  2. Deep Dive into Parquet

  3. Working with Arrow in R

  4. Querying Parquet with Different Engines

  5. Arrow Datasets for Larger-than-Memory Operations

  6. Partitioning Strategies

  7. Hands-on Workshop: Analysis with PUMS Data

Introduction to Column-Oriented Data Storage

Why should I care about data storage?

Data has to be represented somewhere, both during analysis and when storing.

The shape and characteristics of this representation has a huge impact on performance.

What if you could speed up a key part of your analysis by 30x and reduce your storage by 10x?

Row vs. Column-Oriented Storage

Row-oriented

|ID|Name |Age|City    |
|--|-----|---|--------|
|1 |Alice|25 |New York|
|2 |Bob  |30 |Boston  |
|3 |Carol|45 |Chicago |
  • Efficient for single record access
  • Efficient for appending

Column-oriented

ID:    [1, 2, 3]
Name:  [Alice, Bob, Carol]
Age:   [25, 30, 45]
City:  [New York, Boston, Chicago]
  • Efficient for analytics
  • Better compression

Why Column-Oriented Storage?

  • Analytics typically access a subset of columns
    • “What is the average age by city?”
    • Only needs [Age, City] columns
  • Benefits:
    • Only read needed columns from disk
    • Similar data types stored together
    • Better compression ratios

Column-Oriented Data is great

And you use column-oriented dataframes already!

… but still storing my data in a fundamentally row-oriented way.

The serialization problem

The serialization problem

What is Apache Arrow?

  • Cross-language development platform for
    in-memory data
    • Consistent in-memory columnar data format
    • Language-independent
    • Zero-copy reads

What is Apache Arrow?

  • Benefits:
    • Seamless data interchange between systems
    • Fast analytical processing
    • Efficient memory usage

What is Apache Parquet?

  • Open-source columnar storage format
    • Created by Twitter and Cloudera in 2013
    • Part of the Apache Software Foundation

What is Apache Parquet?

  • Features:
    • Columnar storage
    • Explicit schema
    • Statistical metadata
    • Efficient compression

Reading a File

As a CSV file

system.time({
  df <- read.csv("CA_person_2021.csv")
})
   user  system elapsed 
 14.449   0.445  15.037 

Reading a File

As a Parquet file

library(arrow)
options(arrow.use_altrep = FALSE)

system.time({
  df <- read_parquet("CA_person_2021.parquet")
})
   user  system elapsed 
  1.017   0.207   0.568 

Exercise

data <- tibble::tibble(
  integers = 1:10,
  doubles = as.numeric(1:10),
  strings = sprintf("%02d", 1:10)
)

write.csv(data, "numeric_base.csv", row.names = FALSE)
write_csv_arrow(data, "numeric_arrow.csv")
write_parquet(data, "numeric.parquet")

df_csv <- read.csv("numeric_base.csv")
df_csv_arrow <- read_csv_arrow("numeric_arrow.csv")
df_parquet <- read_parquet("numeric.parquet")

Are there any differences?

Exercise (answer)

> df_csv_arrow
# A tibble: 10 × 3
   integers doubles strings
      <int>   <int>   <int>
 1        1       1       1
 2        2       2       2
 3        3       3       3
 4        4       4       4
 5        5       5       5
 6        6       6       6
 7        7       7       7
 8        8       8       8
 9        9       9       9
10       10      10      10
> df_parquet
# A tibble: 10 × 3
   integers doubles strings
      <int>   <dbl> <chr>  
 1        1       1 01     
 2        2       2 02     
 3        3       3 03     
 4        4       4 04     
 5        5       5 05     
 6        6       6 06     
 7        7       7 07     
 8        8       8 08     
 9        9       9 09     
10       10      10 10     

Exercise (answer)

> df_csv_arrow
# A tibble: 10 × 3
   integers doubles strings
      <int>   <int>   <int>
 1        1       1       1
 2        2       2       2
 3        3       3       3
 4        4       4       4
 5        5       5       5
 6        6       6       6
 7        7       7       7
 8        8       8       8
 9        9       9       9
10       10      10      10
> df_parquet
# A tibble: 10 × 3
   integers doubles strings
      <int>   <dbl> <chr>  
 1        1       1 01     
 2        2       2 02     
 3        3       3 03     
 4        4       4 04     
 5        5       5 05     
 6        6       6 06     
 7        7       7 07     
 8        8       8 08     
 9        9       9 09     
10       10      10 10     

Deep Dive into Parquet

What is inside a Parquet file?

  • Schema metadata
    • Self-describing format
    • Preserves column types
    • Type-safe data interchange
  • The data itself
    • Encodings
    • Advanced compression

Structure of a Parquet File

Benchmarks: Parquet vs CSV

Benchmarks: Parquet vs CSV

Reading Efficiency: Selecting Columns

  • With CSV:
    • Must read entire file, even if you only need a few columns
    • No efficient way to skip columns during read
  • With Parquet:
    • Read only needed columns from disk
    • Significant performance benefit for wide tables

Reading Efficiency: Selecting Columns

system.time({
  df_subset <- read_parquet(
    "CA_person_2021.parquet", 
    col_select = c("PUMA", "COW")
  )
})
   user  system elapsed 
  0.027   0.003   0.031 
   user  system elapsed 
  1.017   0.207   0.568 

nanoparquet vs. arrow Reader

  • nanoparquet
    • Lightweight Parquet reader
    • Minimal dependencies
    • Good for embedding
  • arrow
    • Full-featured reader
    • Support for datasets
    • Integration with Arrow ecosystem

nanoparquet vs. arrow Reader

library(arrow)
options(arrow.use_altrep = FALSE)

system.time({
  df <- read_parquet("CA_person_2021.parquet")
})
   user  system elapsed 
  1.017   0.207   0.568 


library(nanoparquet)

system.time({
  df <- read_parquet("CA_person_2021.parquet")
})
   user  system elapsed 
  0.709   0.099   0.894 

Parquet Tooling Ecosystem

Languages with native Parquet support:

  • R (via arrow, nanoparquet)
  • Python (via pyarrow, pandas)
  • Java
  • C++
  • Rust
  • JavaScript
  • Go

Parquet Tooling Ecosystem

Systems with Parquet integration:

  • DuckDB
  • Google BigQuery
  • Snowflake
  • Amazon Athena
  • Apache Spark
  • Apache Hadoop

Working with Parquet files with Arrow in R

Introduction to the arrow Package

# Install and load the Arrow package
install.packages("arrow")
library(arrow)

# Check Arrow version and capabilities
arrow_info()
  • The arrow package provides:
    • Native R interface to Apache Arrow
    • Tools for working with large datasets
    • Integration with dplyr for data manipulation
    • Reading/writing various file formats

Reading and Writing Parquet files, revisited

# Read a Parquet file into R
data <- read_parquet("CA_person_2021.parquet")

# Write an R data frame to Parquet
write_parquet(data, "CA_person_2021_new.parquet")

# Reading a subset of columns
df_subset <- read_parquet(
  "CA_person_2021.parquet", 
  col_select = c("PUMA", "COW", "AGEP")
)

# Reading with a row filter (predicate pushdown)
df_filtered <- open_dataset("CA_person_2021.parquet") |> 
  filter(AGEP > 40) |>
  collect()

Demo: Using dplyr with arrow

# Create an Arrow Table
table <- read_parquet("CA_person_2021.parquet", as_data_frame = FALSE)

# Use dplyr verbs with arrow tables
table |>
  filter(AGEP >= 16) |>
  summarize(
    mean_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) /
      sum(PWGTP),
    count = n()
  ) |>
  collect()

Querying Parquet with Different Engines

Introduction to DuckDB

  • Analytical SQL database system
    • Embedded database (like SQLite)
    • Column oriented
    • In-process query execution
  • Features:
    • Direct Parquet querying
    • Parallel processing
    • Zero-copy integration with arrow

DuckDB

library(duckdb)

con <- dbConnect(duckdb())

# Register a Parquet file as a virtual table
dbExecute(con, "CREATE VIEW pums AS SELECT * 
                FROM read_parquet('CA_person_2021.parquet')")

# Run our query
dbGetQuery(con, "
  SELECT SUM(JWMNP * PWGTP)/SUM(PWGTP) as avg_commute_time,
         COUNT(*) as count
  FROM pums
  WHERE AGEP >= 16
")

dbDisconnect(con, shutdown = TRUE)

duckplyr

library(duckplyr)

# Read data with Arrow
pums_data <- read_file_duckdb(
  "CA_person_2021.parquet", 
  "read_parquet"
)

# Use duckplyr to optimize dplyr operations
pums_data |>
  filter(AGEP >= 16) |>
  summarize(
    mean_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) /
      sum(PWGTP),
    count = n()
  ) |>
  collect()

data.table

library(arrow)
library(data.table)

# Read Parquet file with Arrow
pums_data <- read_parquet("CA_person_2021.parquet")

# Convert to data.table
pums_dt <- as.data.table(pums_data)

# data.table query
pums_dt[AGEP >= 16,
  .(avg_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) / sum(PWGTP), 
    count = .N)]

Demo: Seamless Integration Arrow ↔︎ DuckDB

table <- read_parquet("CA_person_2021.parquet", as_data_frame = FALSE)

# Use dplyr verbs with arrow tables
table |>
  filter(AGEP >= 16) |>
  to_duckdb() |>
  summarize(
    mean_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) /
      sum(PWGTP),
    count = n()
  ) 

Arrow Datasets for Larger-than-Memory Operations

Understanding Arrow Datasets vs. Tables

Arrow Table

  • In-memory data structure
  • Must fit in RAM
  • Fast operations
  • Similar to base data frames
  • Good for single file data

Arrow Dataset

  • Collection of files
  • Lazily evaluated
  • Larger-than-memory capable
  • Distributed execution
  • Supports partitioning

Demo: Querying Multi-file Datasets

pums_ds <- open_dataset("data/person")

# Examine the dataset, list files
print(pums_ds)
head(pums_ds$files)

# Query execution with lazy evaluation
pums_ds |>
  filter(AGEP >= 16) |>
  group_by(year, ST) |>
  summarize(
    mean_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) /
      sum(PWGTP),
    count = n()
  ) |>
  collect()

Lazy Evaluation and Query Optimization

  • Lazy evaluation workflow:
    • Define operations (filter, group, summarize)
    • Optimizes the plan (predicate pushdown, et c.)
    • Only reads necessary data from disk
    • Executes when collect() is called
  • Benefits:
    • Minimizes memory usage + reduces I/O
    • Leverages Arrow’s native compute functions

Working with Datasets on S3

arrow can work with data and datasets in cloud storage. This can be a good option if you don’t have access to a formal DBMS.

  • Easy to store
  • arrow efficiently uses metadata to read only what is necessary

Demo: Working with Datasets on S3

pums_ds <- open_dataset("s3://scaling-arrow-pums/person/")

# Query execution with lazy evaluation
pums_ds |>
  filter(year == 2021, location == "ca", AGEP >= 16) |>
  group_by(year, ST) |>
  summarize(
    mean_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) /
      sum(PWGTP),
    count = n()
  ) |>
  collect()

Demo: Sipping data

pums_ds <- open_dataset("s3://scaling-arrow-pums/person/")

# Query execution with lazy evaluation
pums_ds |>
  filter(AGEP >= 97) |>
  collect()

Partitioning Strategies

What is Partitioning?

  • Dividing data into logical segments
    • Stored in separate files/directories
    • Based on one or more column values
    • Enables efficient filtering
  • Benefits:
    • Faster queries that filter on partition columns
    • Improved parallel processing
    • Easier management of large datasets

Hive vs. Non-Hive Partitioning

Hive Partitioning

  • Directory format: column=value

  • Example:

    person/
    ├── year=2018/
    │   ├── state=NY/
    │   │   └── data.parquet
    │   └── state=CA/
    │       └── data.parquet
    ├── year=2019/
    │   ├── ...
  • Self-describing structure

  • Standard in big data ecosystem

Non-Hive Partitioning

  • Directory format: value

  • Example:

    person/
    ├── 2018/
    │   ├── NY/
    │   │   └── data.parquet
    │   └── CA/
    │       └── data.parquet
    ├── 2019/
    │   ├── ...
  • Requires column naming

  • Less verbose directory names

Effective Partitioning Strategies

  • Choose partition columns wisely:
    • Commonly used in filters
    • Low to medium cardinality
  • Common partition dimensions:
    • Time (year, month, day)
    • Geography (country, state, region)
    • Category (product type, department)

Partitioning in Practice: Writing Datasets

ca_pums_data <- read_parquet("CA_person_2021.parquet")

ca_pums_data |>
  mutate(
    age_group = case_when(
      AGEP < 18 ~ "under_18",
      AGEP < 30 ~ "18_29",
      AGEP < 45 ~ "30_44",
      AGEP < 65 ~ "45_64",
      TRUE ~ "65_plus"
    )
  ) |>
  group_by(ST, age_group) |>
  write_dataset(
    path = "ca_pums_by_age/"
  )

Demo: Repartitioning the whole dataset

pums_data <- open_dataset("data/person")

pums_data |>
  mutate(
    age_group = case_when(
      AGEP < 18 ~ "under_18",
      AGEP < 30 ~ "18_29",
      AGEP < 45 ~ "30_44",
      AGEP < 65 ~ "45_64",
      TRUE ~ "65_plus"
    )
  ) |>
  group_by(year, ST, age_group) |>
  write_dataset(
    path = "pums_by_age/"
  )

Best Practices for Partition Design

  • Avoid over-partitioning:
    • Too many small files = poor performance
    • Target file size: 20MB–2GB
    • Avoid high-cardinality columns (e.g., user_id)
  • Consider query patterns:
    • Partition by commonly filtered columns
    • Balance between read speed and write complexity

Partitioning Performance Impact

open_dataset("<path/to/data>") |>
  filter(year >= 2018) |>
  summarise(
    mean_commute = sum(JWMNP * PWGTP, na.rm = TRUE) / sum(PWGTP)
  ) |>
  collect()

Hands-on practice: Analysis with PUMS Data

Conclusion

  • Column-oriented storage formats like Parquet provide massive performance advantages for analytical workloads (30x speed, 10x smaller files)
  • Partitioning strategies help manage large datasets effectively when working with data too big for memory

Conclusion

  • Apache Arrow enables seamless data interchange between systems without costly serialization/deserialization
  • Multiple query engines (arrow, DuckDB, data.table) offer flexibility depending on your analysis needs, all using modern formats like Parquet

Conclusion

Resources:

Questions?